package spark练习题.练习题01_RDD算子

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

class e01 {

  val conf = new SparkConf().setAppName("test").setMaster("local[6]")
  val sc = new SparkContext(conf)

  //1、创建一个1-10数组的RDD，将所有元素*2形成新的RDD
  @Test
  def e01(): Unit ={
    val rdd = sc.parallelize(1 to 10)
    val maprdd = rdd.map(_*2)
    maprdd.collect().foreach(println(_))
  }
  //2、创建一个10-20数组的RDD，使用mapPartitions将所有元素*2形成新的RDD
  @Test
  def e02(): Unit ={
    val rdd = sc.parallelize(10 to 20,2)
    val mappartitionrdd = rdd.mapPartitions(_.map(_*2))
    mappartitionrdd.collect().foreach(println(_))
  }
  //3、创建一个元素为 1-5 的RDD，运用 flatMap创建一个新的 RDD，新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27..
  @Test
  def e03(): Unit ={
    val rdd = sc.parallelize(1 to 5)
    val flatrdd = rdd.flatMap(x => Array(Math.pow(x,2).toInt,Math.pow(x,3).toInt))
    flatrdd.foreach(println(_))
  }
  //4、创建一个 4 个分区的 RDD数据为Array(10,20,30,40,50,60)，使用glom将每个分区的数据放到一个数组
  @Test
  def e04(): Unit ={
    val rdd = sc.parallelize(Array(10,20,30,40,50,60),4)
    val glom = rdd.glom().collect()
    glom.foreach(println(_))
    for(i <- glom;j <- i)
      println(j)
  }
  //5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8)，按照元素的奇偶性进行分组
  @Test
  def e05(): Unit ={
    val rdd = sc.parallelize(Array(1, 3, 4, 20, 4, 5, 8))
    val rdd1 = rdd.groupBy(
      item =>
        if (item%2==0) "偶数" else "基数"
    )
    rdd1.collect().foreach(print(_))
  }
  //6、创建一个 RDD（由字符串组成）Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong")，过滤出一个新 RDD（包含“xiao”子串）
  @Test
  def e06(): Unit ={
    val rdd = sc.parallelize(Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"))
    val filter = rdd.filter(_.contains("xiao"))
    filter.collect().foreach(println(_))
  }
  //7、创建一个 RDD数据为1 to 10，请使用sample不放回抽样
  @Test
  def e07(): Unit ={
    val rdd = sc.parallelize(1 to 10)
    val sample = rdd.sample(withReplacement = false,0.5,2)
    sample.collect().foreach(println(_))
  }
  //8、创建一个 RDD数据为1 to 10，请使用sample放回抽样
  @Test
  def e08(): Unit ={
    val rdd = sc.parallelize(1 to 10)
    val sample = rdd.sample(withReplacement = true,0.5,1)
    sample.collect().foreach(println(_))
  }
  //9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),对 RDD 中元素执行去重操作
  @Test
  def e09(){
    val rdd = sc.parallelize(Array(10,10,2,5,3,5,3,6,9,1))
    val dupl = rdd.distinct()
    dupl.collect().foreach(println(_))
  }
  //10、创建一个分区数为5的 RDD，数据为0 to 100，之后使用coalesce再重新减少分区的数量至 2
  //   创建一个分区数为5的 RDD，数据为0 to 100，之后使用repartition再重新减少分区的数量至 3
  @Test
  def e10(){
    val rdd = sc.parallelize(0 to 100,5)
    val coalesce = rdd.coalesce(2)
    println(coalesce.partitions.size)

    val repartition = rdd.repartition(3)
    println(repartition.partitions.size)
  }
  //11、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,请给RDD进行分别进行升序和降序排列
  @Test
  def e11(): Unit ={
    val rdd = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16))
    val desc = rdd.sortBy(x => x, false)
    val asc = rdd.sortBy(x => x, true)
    desc.collect().foreach(println(_ ))
    asc.collect().foreach(println(_))
  }
  //12、创建两个RDD，分别为rdd1和rdd2数据分别为1 to 6和1 to 10，求并集
  //    创建两个RDD，分别为rdd1和rdd2数据分别为1 to 6和4 to 10，计算差集，两个都算
  //    创建两个RDD，分别为rdd1和rdd2数据分别为1 to 6和1 to 10，计算交集
  //    创建两个RDD，分别为rdd1和rdd2数据分别为1 to 6和1 to 10，计算 2 个 RDD 的笛卡尔积
  @Test
  def e12(): Unit ={
    val rdd = sc.parallelize(1 to 6)
    val rdd1 = sc.parallelize(1 to 10)
    val rdd2 = rdd.union(rdd1)
    rdd2.collect().foreach(println(_))

    val rdd3 = sc.parallelize(1 to 6)
    val rdd4 = sc.parallelize(4 to 10)
    val rdd5 = rdd3.subtract(rdd4)
    val rdd5_1= rdd4.subtract(rdd3)
    rdd5.collect().foreach(println(_))
    rdd5_1.collect().foreach(println(_))

    val rdd6 = sc.parallelize(1 to 6)
    val rdd7 = sc.parallelize(1 to 10)
    val rdd8 = rdd6.intersection(rdd7)
    rdd8.collect().foreach(println(_))

    val rdd6_1 = sc.parallelize(1 to 6)
    val rdd7_1 = sc.parallelize(1 to 10)
    val rdd8_1 = rdd6_1.cartesian(rdd7_1)
    rdd8_1.collect().foreach(println(_))
  }
  //13、创建两个RDD，分别为rdd1和rdd2数据分别为1 to 5和11 to 15，对两个RDD拉链操作
  @Test
  def e13(): Unit ={
    val rdd = sc.parallelize(1 to 5)
    val rdd1 = sc.parallelize(11 to 15)
    val rdd2 = rdd.zip(rdd1)
    rdd2.collect().foreach(println(_))
  }
  //14、创建一个RDD数据为List(("female",1),("male",5),("female",5),("male",2))，请计算出female和male的总数分别为多少
  @Test
  def e14(): Unit ={
    val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
    val reduce = rdd.reduceByKey((curr,agg)=>curr+agg)
    reduce.collect().foreach(println(_))
  }
  //15、创建一个有两个分区的 RDD数据为List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8))，取出每个分区相同key对应值的最大值，然后相加
  @Test
  def e15(): Unit ={
    val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
    val rdd1= rdd.glom().collect().foreach(x => println(x.mkString(",")))  // 将数组转换成字符串打印出来
    val rdd2 = rdd.aggregateByKey(0)(math.max(_,_),_+_)
    rdd2.collect().foreach(println(_))
  }
  //16、创建一个有两个分区的 pairRDD数据为Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))，根据 key 计算每种 key 的value的平均值
  @Test
  def e16(): Unit ={
    val rdd = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
    // 1
    val rdd1 = rdd.groupByKey().map(x=>x._1->x._2.sum/x._2.size)
    rdd1.collect().foreach(println(_))
    // 2
    val rdd2 = rdd.map(x=>(x._1,(x._2,1)))
      .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
      .map(x=>(x._1,x._2._1/x._2._2))
    rdd2.collect().foreach(println(_))
  }
  //17、统计出每一个省份广告被点击次数的 TOP3
  //  数据结构：时间戳，省份，城市，用户，广告 字段使用空格分割。
  //  样本如下：
  //  1516609143867 6 7 64 26
  //  1516609143861 8 4 75 28
  //  1516609143864 8 7 87 22
  //  1516609143865 6 7 64 36
  //  1516609143866 9 4 75 38
  //  1516609143868 6 7 87 32
  //  1516609143857 6 7 64 16
  //  1516609143859 9 4 75 48
  //  1516609143859 1 7 87 42
  //  1516609143847 6 7 64 26
  //  1516609143859 9 4 75 48
  //  1516609143849 1 7 87 42
  //  1516609143837 6 7 64 86
  //  1516609143839 9 4 75 78
  //  1516609143829 1 7 87 32
  //  1516609143827 6 7 64 76
  //  1516609143829 9 4 75 98
  //  1516609143819 1 7 87 112
  @Test
  def e17(): Unit ={
    val rdd = sc.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\ads")
    //rdd.take(2).foreach(println(_))
    rdd.map { x => var data = x.split(" ");(data(1),(data(4),1))}
      .groupByKey()
//    (8,CompactBuffer((28,1), (22,1)))
//    (6,CompactBuffer((26,1), (36,1), (32,1), (16,1), (26,1), (86,1), (76,1)))
//    (9,CompactBuffer((38,1), (48,1), (48,1), (78,1), (98,1)))
//    (1,CompactBuffer((42,1), (42,1), (32,1), (112,1)))
      .map{
        case (province,list) => {
          val tuples =
            list.groupBy(_._1).map(x=>(x._1,x._2.size)).toList.sortWith((x,y)=>x._2>y._2).take(3)
          (province,tuples)
      }
    }.collect().sortBy(_._1).foreach(println)
  }
  //18、读取 people.json.json 数据的文件, 每行是一个 json.json 对象，进行解析输出
  @Test
  def e18(): Unit ={
    import scala.util.parsing.json.JSON
    val rdd = sc.textFile("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/people.json.json")
    val result = rdd.map(JSON.parseFull)
    result.collect().foreach(println(_))
  }
  //19、保存一个 SequenceFile 文件，使用spark创建一个RDD数据为Array(("a", 1),("b", 2),("c", 3))，保存为SequenceFile格式的文件到hdfs上
  //    读取19题的SequenceFile 文件并输出
  @Test
  def e19(): Unit ={
    val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
    // 写入
    //rdd.saveAsSequenceFile("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/SequenceFile")
    // 读取
    val rdd1 = sc.sequenceFile[String,Int]("hdfs://192.168.64.129:9000/user/root/spark/sparkSql/SequenceFile/part-00001")
    rdd1.collect().foreach(println(_))
  }

  // 20.读写 objectFile 文件，把 RDD 保存为objectFile，RDD数据为Array(("a", 1),("b", 2),("c", 3))，并进行读取出来
  @Test
  def e20(): Unit ={
    val rdd = sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
    //rdd.saveAsObjectFile("src/main/scala/spark练习题/练习题01/data/output20200407/20200407_objectFile")
    val rdd1 = sc.objectFile("src/main/scala/spark练习题/练习题01/data/output20200407/20200407_objectFile/part-00001")
    println(rdd1)
  }

  //21.使用内置累加器计算Accumulator.txt文件中空行的数量
  @Test
  def e21(): Unit ={
    val rdd = sc.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\Accumulator")
    var count = sc.longAccumulator("count")
    rdd.foreach(x => if (x=="") count.add(1))
    println(count.value)
  }

 //22.使用Spark广播变量
 //用户表：
 //id name age gender(0|1)
 //001,刘向前,18,0
 //002,冯  剑,28,1
 //003,李志杰,38,0
 //004,郭  鹏,48,2
 //要求，输出用户信息，gender必须为男或者女，不能为0,1
 //使用广播变量把Map("0" -> "女", "1" -> "男")设置为广播变量，最终输出格式为
 //001,刘向前,18,女
 //003,李志杰,38,女
 //002,冯  剑,28,男
 //004,郭  鹏,48,男
  @Test
  def e22(): Unit ={
    val rdd = sc.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\students")
    val broadcast = sc.broadcast(Map("0"->"女","1"->"男"))
    rdd.foreach{
      x =>
        var datas = x.split(",");
        println( datas(0)+","+datas(1)+","+datas(2)+","+ broadcast.value(datas(3)) )
    }
  }

  //23.mysql创建一个数据库spark，在此数据库中创建一张表
  //CREATE TABLE `user` (
  //  `id` int(11) NOT NULL AUTO_INCREMENT,
  //  `username` varchar(32) NOT NULL COMMENT '用户名称',
  //  `birthday` date DEFAULT NULL COMMENT '生日',
  //  `sex` char(1) DEFAULT NULL COMMENT '性别',
  //  `address` varchar(256) DEFAULT NULL COMMENT '地址',
  //  PRIMARY KEY (`id`)
  //) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  //数据依次是：姓名 生日 性别 省份
  //请使用spark将以上数据写入mysql中，并读取出来
  @Test
  def e23(): Unit ={
    val rdd = sc.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\user")
    // 配置数据库相关信息
    val Driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/spark"
    val username = "root"
    val password = "123456"
    //插入数据
    rdd.foreachPartition{
      data =>
        // 创数据库连接
        Class.forName(Driver)
        val connection = java.sql.DriverManager.getConnection(url,username,password)
        val sql = "insert into user values (Null,?,?,?,?)"
        data.foreach{
          tuples => {
            val datas = tuples.split(" ")
            val statement = connection.prepareStatement(sql)
            statement.setString(1,datas(0))
            statement.setString(2,datas(1))
            statement.setString(3,datas(2))
            statement.setString(4,datas(3))
            statement.executeUpdate()
            statement.close()
          }
        }
        connection.close()
    }
  }
  // 读取数据库
  @Test
  def e24(): Unit ={
    // 配置数据库相关信息
    val Driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/spark"
    val username = "root"
    val password = "123456"
    val sql = "select * from user where id between ? and ?"
    val jdbcRDD = new JdbcRDD(
        sc,
        () => {
          Class.forName(Driver)
          java.sql.DriverManager.getConnection(url,username,password)},
        sql,0,44,3,
        result => {
          println(s"id=${result.getInt(1)},username=${result.getString(2)}" +
            s",birthday=${result.getDate(3)},sex${result.getString(4)}" +
            s",address${result.getString(5)}")
        }
    )
    jdbcRDD.collect()
  }

}
